package com.lianda.log.function;

import com.lianda.alert.model.LogEvent;
import com.lianda.log.model.OriginalLogEvent;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;

public class OriLog2LogEventFlatMapFunction extends RichFlatMapFunction<OriginalLogEvent, LogEvent> {

    @Override
    public void flatMap(OriginalLogEvent originalLogEvent, Collector<LogEvent> collector) throws Exception {
        if (originalLogEvent == null) {
            return;
        }
        LogEvent logEvent = new LogEvent();
        String source = originalLogEvent.getSource();
        if (source.contains("middleware")) {
            logEvent.setType("MIDDLEWARE");
        } else if (source.contains("app")){
            logEvent.setType("APP");
        } else if (source.contains("docker")) {
            logEvent.setType("DOCKER");
        } else {
            logEvent.setType("MACHINE");
        }

        logEvent.setMessage(originalLogEvent.getMessage());

    }
}
